-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
重构TaskPool实现 #80 中讨论的新需求 #93
Conversation
Signed-off-by: longyue0521 <longyueli0521@gmail.com>
Codecov Report
@@ Coverage Diff @@
## dev #93 +/- ##
==========================================
- Coverage 96.31% 96.26% -0.05%
==========================================
Files 23 23
Lines 895 1044 +149
==========================================
+ Hits 862 1005 +143
- Misses 26 30 +4
- Partials 7 9 +2
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
Signed-off-by: longyue0521 <longyueli0521@gmail.com>
可以画一个流程图吗?我确实有点没太看懂 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
按照我的理解:
- 要不要开 goroutine:应该是在 submit task 的时候。每次 submit 之后看一下队列和goroutine 的数量,然后决定开不开;
- 要不要关掉 goroutine 是在每个 goroutine 从队列里面拿 task 的时候。相当于最大空闲时间创建一个 context,作为一个 case,和队列里面拿一个 task 作为一个 case,进行 select
- 在开 goroutine 的时候,每次应该最多开一个。而 initGo 应该是在 Start 的时候就创建起来了
- 在关 goroutine 的时候,则需要判断有没有下降到 initGo。
pool/task_pool.go
Outdated
numGoRunningTasks int32 | ||
|
||
totalNumOfGo int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这两个可以稍微对齐一下下面的 initGo, coreGo, maxGo?比如说 runningGo, totalGo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numGoRunningTasks 表示有多少个goroutine处于case task, ok := <-queue
分支中, 要不workingGo?
totalNumOfGo已经改为totalGo
submit根据需求是可以在未start的情况下直接调用的即一次性填满队列再调用start若此后不再submit任何任务,那么此时“开新协程”的逻辑在start种也要有.
为什么每次应该最多开一个? 这种决策处于什么考量? 举例,initGo=10, coreGo=20, maxGo=30,queueSize = 50,用户在未start的情况下将queue填满,再调用start并在此后不再submit任何任务.此时start除了要开initGo个协程外(此时totalGo=initGo=10),还要再开多少个协程?怎么开,一个一个开还是开多个?什么时机开,submit不再调用,start一旦返回不可重复调用?
依照上面的例子,当
func goroutine() {
ctx, _ := context.WithTimeout(context.Background(), maxIdleTime)
select {
case <-ctx.Done():
// 根据最大空闲时间创建的ctx
return
case task, ok := <-b.queue:
// 执行任务
}
} 是上面这段伪代码的意思吗? 如果是,你把context当成现有代码中的idleTimer就行了,思路是一样的. 之所以用timer而不是context详见 #65 以及“最大空闲时间”的定义.当前协程如果处在(初始,核心]范围内并且一直没有抢到任务进入 判断当前goroutine是否要关闭的时机是在
|
先看最新提交,具体说说哪里没看懂 |
我草,我都忘了可以提交完任务之后再开始了,你忽略 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
稍等一下,我试试在你的合并请求的基础上,我看看能不能去掉 timeoutGroup 和 timer,因为这两个是代码难以理解的根源。
基本想法是在进来 goroutine 方法的时候使用 context 来控制最大空闲时间,然后每次超时看一下自己是不是能够被关掉,能的话就关掉。
if b.coreGo != b.initGo && b.maxGo == b.initGo { | ||
b.maxGo = b.coreGo | ||
} else if b.coreGo == b.initGo && b.maxGo != b.initGo { | ||
b.coreGo = b.maxGo | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我觉得这个调整和下面的校验,可以只保留一种做法。也就是说,如果你传入的参数不对,我自己去调整对,这是一种做法;另外一种做法就是我完全不会帮你调整,直接报错。
这里面就相当于,我调整了,但是我只调整了一点点
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我希望WithCoreGo(coreGo)和WithMaxGo(maxGo)是相互对立的且可以单独使用的,所以这里的调整是必要的.
默认情况下initGo=coreGo=maxGo, 当用户只使用WithCoreGo(coreGo)时,走下面的校验就可能会因为initGo=maxGo < coreGo报错, 这样用户就会比较困惑.
起初我考虑过使用context但想到上次的讨论 #65 以及这次需求场景才改为timer的. 有这样一种情况,当执行流程进入 timer也符合语义,运行完任务就得到一个计时器——如果得到的是stop的timer,那它就是initGo区间内的常驻协程.如果是未stop的timer,倒计时到了你就退出. |
timeoutGroup原本是一个int32原子变量用于计数,用 |
确实没办法去掉。我先合并了,后面我准备弄一个模糊测试来测试长时间运行的这个 pool 会不会出现没有调度或者死锁之类的问题 |
根据 #80 中讨论尤其是 讨论1和讨论2
最终我没有采取在讨论2提到的按协程分类创建策略.原因如下:
本次重构采用的是在协程运行任务结束后,根据当前协程总数(totalNumOfGo)与初始数(10, initGo),核心数(20, coreGo)的关系来动态随机划分协程.并保证最终稳定在“初始数”即10个. 算法如下:
b.coreGo < b.totalNumOfGo && (len(b.queue) == 0 || int32(len(b.queue)) < b.totalNumOfGo)
则更新协程总数并直接退出b.initGo < b.totalNumOfGo-b.timeoutGroup.size()
那么当前协程会被设置定时器,在“最大空闲时间”内没接到任务会因为超时而退出.timeoutGroup是超时组,有两个作用:
b.totalNumOfGo-b.timeoutGroup.size()
能够准确反应结果保证动态分类的正确性